Setup Methods¶
These cells must be run prior to any of the other sections below. They load the runtime with the function calls needed for the later cells.
In [1]:
import plotly.express as px
import pandas as pd
def read_data():
stk_data_parsed = []
stk_headers = ["id1", "id2", "id3", "country",
"sat_type", "id4", "num1", "num2", "num3", "num4",
"sat_status", "fetched_date"]
with open("data/stk.txt") as stk_file_text:
# Append the headers to the parsed data.
for line in stk_file_text:
# Pandas was having trouble reading this in so just parse by column width in converted .txt doc.
id1 = line[0:20].rstrip()
id2 = line[20:35].rstrip()
id3 = line[35:46].rstrip()
country = line[46:56].rstrip()
sat_type = line[56:66].rstrip()
id4 = line[66:116].rstrip()
num1 = line[116:121].rstrip()
num2 = line[121:127].rstrip()
num3 = line[127:132].rstrip()
num4 = line[132:141].rstrip()
sat_status = line[141:149].rstrip()
fetched_date = line[149:].rstrip()
data_agg = [
id1,
id2,
id3,
country,
sat_type,
id4,
num1,
num2,
num3,
num4,
sat_status,
fetched_date
]
stk_data_parsed.append(data_agg)
stk_data = pandas.DataFrame(stk_data_parsed, columns=stk_headers)
sat_data = pandas.read_csv("data/SATCAT.csv")
# Merge datasets and filter out decayed satellites
merged_data = sat_data.merge(stk_data, how='inner', left_on='INTLDES', right_on='id3')
merged_data = merged_data[merged_data['DECAY'].isnull()]
merged_data.to_csv('data/merged.csv')
return merged_data
def plot_satellites_overlapping(merged_data, x_trade_space_altitudes_km=None):
x_trade_space_altitudes_km = x_trade_space_altitudes_km or [i for i in range(0, 2000, 5)]
y_satelites_overlapping = []
start_x = x_trade_space_altitudes_km[0]
end_x = x_trade_space_altitudes_km[-1]
for altitude in x_trade_space_altitudes_km:
# Get all sats within the perigee/apogee of the altitude provided.
sats_within_range = merged_data[(merged_data['PERIGEE'] <= altitude)
& (merged_data['APOGEE'] >= altitude)]
y_satelites_overlapping.append(len(sats_within_range))
fig = px.bar(x=x_trade_space_altitudes_km, y=y_satelites_overlapping,
title=f'Sats overlapping (y) for altitudes (x) within range of [{start_x}, {end_x}] km',
labels={'x': 'Altitude (km)', 'y':'Number of Overlapping Satellites'})
fig.show()
In [2]:
import requests
import pandas
import json
import os
SPACETRACK_LOGIN_URL = "https://www.space-track.org/ajaxauth/login"
SPACETRACK_TLE_QUERY_URL = "https://www.space-track.org/basicspacedata/query/class/tle_publish/PUBLISH_EPOCH/%3Enow-30/orderby/PUBLISH_EPOCH%20asc/emptyresult/show"
# By default just use the files we have. Otherwise fetch the latest TLE's.
def load_tles(username, password, override=False):
if not override and os.path.exists("data/tles.json"):
with open ("data/tles.json", "r") as tle_json:
return json.load(tle_json)
with requests.Session() as session:
resp = session.post(SPACETRACK_LOGIN_URL, data={'identity':username,'password':password})
print(resp.__dict__)
# Query for recent TLE's.
result = session.get(SPACETRACK_TLE_QUERY_URL, auth=(username, password))
tle_data = result.json()
with open("data/tles.json", "w+") as tle_json:
json.dump(tle_data, tle_json, indent=4)
if not tle_data:
raise Error("Bad request")
return tle_data
def merge_tles_to_dataframe(data, username, password):
tle_json = load_tles(username, password)
tle_dataframe = pandas.DataFrame(tle_json)
# Add TLE's to datasets.
merged_data = data[data['DECAY'].isnull()]
print(f"{len(merged_data)} data elements before TLE merge")
# Only grab latest TLE's from the publish epoch. Also need NORAD_CAT_ID's as integers to map to other .csv.
tle_dataframe['NORAD_CAT_ID'] = tle_dataframe['NORAD_CAT_ID'].astype('int64')
tle_dataframe = tle_dataframe.sort_values(by=['PUBLISH_EPOCH'],ascending=[False]).groupby('NORAD_CAT_ID', as_index=False).first()
# Our complete merged dataframe!
merged_data = merged_data.merge(tle_dataframe, how='inner', left_on='NORAD_CAT_ID', right_on='NORAD_CAT_ID')
print(f"{len(merged_data)} data elements after TLE merge")
merged_data.to_csv("data/merged-with-tles.csv")
return merged_data
In [3]:
import os
import datetime
import random
import orekit
from orekit.pyhelpers import datetime_to_absolutedate, absolutedate_to_datetime
from org.hipparchus.geometry.euclidean.threed import Vector3D
from org.hipparchus.linear import RealMatrix
from org.orekit.propagation import StateCovariance
from org.orekit.frames import FramesFactory
from org.orekit.orbits import PositionAngleType, CartesianOrbit, KeplerianOrbit, CircularOrbit, EquinoctialOrbit, OrbitType
from org.orekit.ssa.metrics import ProbabilityOfCollision
from org.orekit.ssa.collision.shorttermencounter.probability.twod import Patera2005
from org.orekit.utils import PVCoordinates
from org.orekit.utils import Constants
from org.hipparchus.linear import MatrixUtils
from org.orekit.utils import IERSConventions, PVCoordinates
from org.hipparchus.linear import MatrixUtils
from orekit.pyhelpers import setup_orekit_curdir
from org.orekit.propagation.analytical.tle import TLE, TLEPropagator
from org.orekit.time import TimeScalesFactory, AbsoluteDate
from datetime import datetime, timedelta
from typing import Any
import subprocess
import multiprocessing
import time
from dataclasses import dataclass
from orethread import handle_task_queue
orekit.pyhelpers.download_orekit_data_curdir()
print("Starting OreKit VM")
vm = orekit.initVM()
setup_orekit_curdir()
print("Done setting up OreKit VM")
def load_from_dataframe(dataframe):
tle_list = []
for index, row in dataframe.iterrows():
tle1 = row['TLE_LINE1']
tle2 = row['TLE_LINE2']
tle_list.append((tle1, tle2))
return tle_list
def dump_queue(q):
q.put(None)
return list(iter(q.get, None))
def determine_collisions(merged_data, tle1, tle2, time_step_seconds=60.0, collision_threshold_km=5, days_forward=1):
# Load TLEs
sat_tle = (tle1, tle2) # Replace with our sat actual TLE
potential_debris_tles = load_from_dataframe(merged_data) # File containing debris TLEs
# # Initialize propagators
# sat_propagator = TLEPropagator.selectExtrapolator(sat_tle)
# debris_propagators = [TLEPropagator.selectExtrapolator(tle) for tle in potential_debris_tles]
ts = TimeScalesFactory.getUTC()
dt = datetime.now()
dt2 = dt + timedelta(days=days_forward)
start_date = AbsoluteDate(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second + dt.microsecond / 1000000.0, ts)
end_date = AbsoluteDate(dt2.year, dt2.month, dt2.day, dt2.hour, dt2.minute, dt2.second + dt2.microsecond / 1000000.0, ts)
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
worker_count = 16
workers = []
print("Starting collision computations. This may take some time.")
current_date = start_date
jobs = 0
while current_date.compareTo(end_date) <= 0:
task_queue.put(absolutedate_to_datetime(current_date))
current_date = current_date.shiftedBy(time_step_seconds)
jobs += 1
print(f"Executing {jobs} jobs")
for _ in range(worker_count):
task_queue.put(None)
for _ in range(worker_count):
p = multiprocessing.Process(target=handle_task_queue, args=(task_queue, result_queue, sat_tle, potential_debris_tles, collision_threshold_km))
workers.append(p)
p.start()
for p in workers:
p.join()
collision_count = len(dump_queue(result_queue))
print(f"{collision_count} total collisions possible during specified time frame")
Downloading file from: https://gitlab.orekit.org/orekit/orekit-data/-/archive/master/orekit-data-master.zip Starting OreKit VM Done setting up OreKit VM
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
Load Data¶
Run this cell once and then use it for subsequent cells.
In [4]:
spacetrack_username = "" # TODO - Replace
spacetrack_password = "" # TODO - Replace
provided_data = read_data()
merged_tle_data = merge_tles_to_dataframe(provided_data, spacetrack_username, spacetrack_password)
31519 data elements before TLE merge 26985 data elements after TLE merge
Determine number of overlapping sats for given altitudes¶
In [5]:
# Altitudes between n1 and n2 km, incrementing n3 km per iteration.
plot_satellites_overlapping(provided_data, [i for i in range(0, 1000+1, 2)])
plot_satellites_overlapping(provided_data, [i for i in range(0, 10000+1, 20)])
plot_satellites_overlapping(provided_data, [i for i in range(0, 50000+1, 100)])
Determine Collisions¶
After the trade analysis has been done and an orbit selected you just need to update this cell with your TLE's and call the propogator.
In [6]:
import math
from org.orekit.propagation.analytical.tle.generation import FixedPointTleGenerationAlgorithm, LeastSquaresTleGenerationAlgorithm
from org.orekit.propagation import SpacecraftState
def create_tle_from_orbital_elements(a, e, i, omega, raan, lv):
# These are just for the template format. DO NOT CHANGE THESE.
sat_tle1 = "1 25544U 98067A 25055.73635937 .00029091 00000-0 52662-3 0 9995"
sat_tle2 = "2 25544 51.6356 143.9899 0006112 310.6589 49.3868 15.49395806497720"
ts = TimeScalesFactory.getUTC()
dt = datetime.now()
date = AbsoluteDate(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second + dt.microsecond / 1000000.0, ts)
inertial_frame = FramesFactory.getEME2000()
orbit = KeplerianOrbit(a, e, i, omega, raan, lv, PositionAngleType.TRUE, inertial_frame, date, Constants.IAU_2015_NOMINAL_EARTH_GM)
#orbit = EquinoctialOrbit(orbit)
template_tle = TLE(sat_tle1, sat_tle2)
tle_generator = LeastSquaresTleGenerationAlgorithm(1_000_000)
generated_tle = TLE.stateToTLE(SpacecraftState(orbit), template_tle, tle_generator);
return generated_tle.getLine1(), generated_tle.getLine2()
# TODO - Replace me with your orbit information
a = 500_000.0 # Semi-major axis in meters
e = 0.000 # Eccentricity
i = math.radians(97.0) # Inclination in radians
omega = math.radians(90.0) # Argument of perigee in radians
raan = math.radians(20.0) # Right ascension of the ascending node in radians
lv = math.radians(0.0) # True anomaly in radians
# Update with our TLE's.
#sat_tle1, sat_tle2 = create_tle_from_orbital_elements(a, e, i, omega, raan, lv)
#print(sat_tle1)
#print(sat_tle2)
# TODO - replace with your TLE's.
sat_tle1 = "1 25544U 98067A 25055.73635937 .00029091 00000-0 52662-3 0 9995"
sat_tle2 = "2 25544 51.6356 143.9899 0006112 310.6589 49.3868 15.49395806497720"
# Check for any possible collisions within 1 day, with steps of 60 second, with 'collisions' being satellites within 10 km.
# You can modify these parameters as you see fit. Ideally you would want sub-second time steps over a 2-3 week period, but
# my feeble little macbook won't run that very fast - even though I made it to use multiple CPU cores.
# Normally you would have a larger machine and propogate over a period of weeks, while constantly refreshing orbital data on subsequent runs.
# TLE's are also generally inaccurate and can drift fairly quickly - so while this method is not perfect, it should give a small degree
# of confidence in the trade space.
determine_collisions(merged_tle_data, sat_tle1, sat_tle2, time_step_seconds=60.0, collision_threshold_km=10, days_forward=1)
Starting collision computations. This may take some time. Executing 1441 jobs
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed. OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
Thread: 83de7f0c-b582-4b92-8c4d-3981fca78322 has processed 100 jobs. Thread: eaffd000-36ca-49e4-aac8-57abf784cd31 has processed 100 jobs. Thread: aea6cec5-5315-4fe0-ba1e-76dceb15b5bd has processed 100 jobs. 0 total collisions possible during specified time frame
Categorize Debris¶
This is just a cell to categorize the debris in the selected orbit altitude and then plot the types of debris.
In [7]:
def categorize_debris_at_altitude(altitude_km, merged_data):
# Get sats within range.
sats_within_range = merged_data[(merged_data['PERIGEE'] <= altitude_km)
& (merged_data['APOGEE'] >= altitude_km)]
# Fitler out decayed satellites.
sats_within_range = sats_within_range[sats_within_range['DECAY'].isnull()]
# rocket body, debris, inactive satellites, and active satellites.
rocket_body_count = len(sats_within_range[sats_within_range['OBJECT_TYPE'] == "ROCKET BODY"]) # OBJECT_TYPE = ROCKET BODY
debris_count = len(sats_within_range[sats_within_range['OBJECT_TYPE'] == "DEBRIS"]) # OBJECT_TYPE = DEBRIS
active_satellites = len(sats_within_range[(sats_within_range['OBJECT_TYPE'] == "PAYLOAD") &
(sats_within_range['sat_status'] == "Active")]) # OBJECT_TYPE = PAYLOAD, sat_status = Inactive
inactive_satellites = len(sats_within_range[(sats_within_range['OBJECT_TYPE'] == "PAYLOAD") &
(sats_within_range['sat_status'] == "Inactive")]) # OBJECT_TYPE = PAYLOAD, sat_status = Inactive
unknown_satellites = len(sats_within_range[(sats_within_range['OBJECT_TYPE'] == "PAYLOAD") &
(sats_within_range['sat_status'] == "Unknown")]) # OBJECT_TYPE = PAYLOAD, sat_status = Unknown
keys = ["Rocket Body", "Debris", "Active Sat", "Inactive Sat", "Unknown Sat"]
values = [rocket_body_count, debris_count, active_satellites, inactive_satellites, unknown_satellites]
print([f"{keys[i]}:{values[i]}" for i in range(0, len(values))])
fig = px.bar(x=keys,
y=values,
title=f'Sats overlapping (y) of altitude {altitude_km}km categorized',
labels={'x': 'Category', 'y':'Number of Overlapping Satellites'})
fig.show()
categorize_debris_at_altitude(altitude_km=275, merged_data=provided_data)
['Rocket Body:192', 'Debris:89', 'Active Sat:15', 'Inactive Sat:14', 'Unknown Sat:15']
In [ ]: